#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <dirent.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBCONTROL

#include "limits.h"
#include "adt.h"
#include "sysy_lib.h"
#include "bmap.h"
#include "net_table.h"
#include "configure.h"
#include "table_proto.h"
#include "volume_proto.h"
#include "metadata.h"
#include "core.h"
#include "md_proto.h"
#include "md_map.h"
#include "net_global.h"
#include "volume_ctl.h"
#include "../storage/stor_rpc.h"
#include "volume.h"
#include "job_dock.h"
#include "ylog.h"
#include "dbg.h"
#include "iscsi.h"

STATIC int IO_FUNC __volume_proto_chunk_read(volume_proto_t *volume_proto, chunk_io_t *_chunk_io,
                                     int chunk_count, buffer_t *buf)
{
        int ret, i;
        chunk_io_t *chunk_io;

        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
                ret = volume_proto_chunk_read(volume_proto, chunk_io);
                if (unlikely(ret)) {
#if 0
                        if (ret == ENOENT) {
                                if (!(chunk_io->io.flags & __FILE_ATTR_NOFILL__)) {
                                        DINFO("chunk "CHKID_FORMAT" read zero %s\n",
                                              CHKID_ARG(&chunk_io->io.id), strerror(ret));
                                        mbuffer_appendzero(&chunk_io->buf, chunk_io->io.size);
                                        continue;
                                } else
                                        GOTO(err_ret, ret);
                        } else {
                                DINFO("chunk "CHKID_FORMAT" %s\n",
                                      CHKID_ARG(&chunk_io->io.id), strerror(ret));
                                GOTO(err_ret, ret);
                        }
#else
                        DBUG("chunk "CHKID_FORMAT" %s\n", CHKID_ARG(&chunk_io->io.id), strerror(ret));
                        GOTO(err_ret, ret);
#endif
                }
        }

        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
                mbuffer_merge(buf, &chunk_io->buf);
        }

        return 0;
err_ret:
        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
                mbuffer_free(&chunk_io->buf);
        }
        return ret;
}


STATIC int __volume_proto_chunk_read_snapshot(volume_proto_t *volume_proto, chunk_io_t *_chunk_io,
                                            int chunk_count, buffer_t *buf)
{
        int ret, i;
        chunk_io_t *chunk_io;

        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
                ret =  volume_proto_chunk_pre_read(volume_proto, chunk_io);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                ret = ENOKEY;
                                GOTO(err_ret, ret);
                        } else {
                                DINFO("chunk "CHKID_FORMAT" %s\n",
                                      CHKID_ARG(&chunk_io->io.id), strerror(ret));
                                GOTO(err_ret, ret);
                        }
                }

                if (chunk_io->io.flags & __FILE_ATTR_PEEK__) {
                        //mbuffer_appendzero(&chunk_io->buf, chunk_io->io.size);
                        continue;
                }

                ret = chunk_proto_rep_read(chunk_io->chkinfo, chunk_io->chkstat, chunk_io->vfm,
                                           &chunk_io->io, &chunk_io->buf, NULL);
                if (unlikely(ret)) {
                        DINFO("chunk "CHKID_FORMAT" %s\n",
                              CHKID_ARG(&chunk_io->io.id), strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
                mbuffer_merge(buf, &chunk_io->buf);
        }

        return 0;
err_ret:
        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
                mbuffer_free(&chunk_io->buf);
        }
        return ret;
}

STATIC int __volume_proto_chunk_read_clone(volume_proto_t *volume_proto, chunk_io_t *_chunk_io,
                                         int chunk_count, buffer_t *buf)
{
        int ret, i;
        chunk_io_t *chunk_io;

        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];

                ret =  volume_proto_chunk_pre_read(volume_proto, chunk_io);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                ret = volume_proto_snapshot_chunk_redirect(volume_proto,
                                                                           &chunk_io->io.id,
                                                                           &chunk_io->buf,
                                                                           chunk_io->io.size,
                                                                           chunk_io->io.offset,
                                                                           TRUE);
                                if (unlikely(ret)) {
                                        if (ret == ENOENT) {
                                                if (!(chunk_io->io.flags & __FILE_ATTR_NOFILL__)) {
                                                        mbuffer_appendzero(&chunk_io->buf, chunk_io->io.size);
                                                        DINFO("chkid "CHKID_FORMAT" append zero, will continue.\n", CHKID_ARG(&chunk_io->io.id));
                                                        continue;
                                                } else
                                                        GOTO(err_ret, ret);
                                        } else {
                                                DINFO("chunk "CHKID_FORMAT" %s\n",
                                                                CHKID_ARG(&chunk_io->io.id), strerror(ret));
                                                GOTO(err_ret, ret);
                                        }
                                }

                                continue;
                        } else {
                                DINFO("chunk "CHKID_FORMAT" %s\n",
                                      CHKID_ARG(&chunk_io->io.id), strerror(ret));
                                GOTO(err_ret, ret);
                        }
                }

                ret = chunk_proto_rep_read(chunk_io->chkinfo, chunk_io->chkstat, chunk_io->vfm,
                                       &chunk_io->io, &chunk_io->buf, NULL);
                if (unlikely(ret)) {
                        DINFO("chunk "CHKID_FORMAT" %s\n",
                              CHKID_ARG(&chunk_io->io.id), strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
                mbuffer_merge(buf, &chunk_io->buf);
        }

        return 0;
err_ret:
        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
                mbuffer_free(&chunk_io->buf);
        }
        return ret;
}

STATIC int __volume_proto_chunk_write(volume_proto_t *volume_proto, io_opt_t *io_opt,
                                      chunk_io_t *_chunk_io, int chunk_count)
{
        int ret, i;
        chunk_io_t *chunk_io;

        ANALYSIS_BEGIN(0);
        
        io_opt->flag |= IO_OPT_CREAT;
        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];

                // 从chkstat.chkstat_clock获取clock，且递增chkstat.chkstat_clock
                ret = volume_proto_chunk_write(volume_proto, io_opt, chunk_io);
                if (unlikely(ret)) {
                        DBUG("chunk "CHKID_FORMAT" %s\n",
                             CHKID_ARG(&chunk_io->io.id), strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

        ANALYSIS_QUEUE(0, IO_WARN, "__volume_proto_chunk_write");
        
        return 0;
err_ret:
        return ret;
}


STATIC int __volume_proto_chunk_write_clone(volume_proto_t *volume_proto, io_opt_t *io_opt,
                                          chunk_io_t *_chunk_io, int chunk_count)
{
        int ret, i, retry = 0;
        chunk_io_t *chunk_io;

        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
        retry:
                ret =  volume_proto_chunk_write(volume_proto, io_opt, chunk_io);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                        retry1:
                                ret = volume_proto_snapshot_chunk_clone(volume_proto,
                                                                        &chunk_io->io.id, 0);
                                if (unlikely(ret)) {
                                        if (chunk_count > 1) {
                                                USLEEP_RETRY1(err_exit, ret, retry1, retry, 2, (1000 * 1000));
                                        } else
                                                GOTO(err_ret, ret);
                                }

                                goto retry;
                        } else {
                                DWARN("chunk "CHKID_FORMAT" %s\n",
                                      CHKID_ARG(&chunk_io->io.id), strerror(ret));
                                GOTO(err_ret, ret);
                        }
                }
        }

        return 0;
err_exit:
        DWARN("chunk "CHKID_FORMAT" %s, exit for reset clock\n",
              CHKID_ARG(&chunk_io->io.id), strerror(ret));
        volume_proto->ltime = 0;
        //EXIT(EAGAIN);
err_ret:
        return ret;
}

int volume_proto_rep_write(volume_proto_t *volume_proto, io_opt_t *io_opt, const io_t *io, const buffer_t *buf)
{
        int ret, chunk_count = 0, i;
        chunk_io_t *chunk_io, *tmp;

        if (unlikely(volume_proto->table1.fileinfo.attr & __FILE_ATTR_SNAPSHOT__)) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        DBUG("write file "CHKID_FORMAT" off %ju size %u\n",
             CHKID_ARG(&volume_proto->chkid), io->offset, io->size);

        ANALYSIS_BEGIN(0);

        if (unlikely(io->size == 0)) {
                goto out;
        }

#ifdef HAVE_STATIC_ASSERT
        static_assert(sizeof(*chunk_io)  * LICH_SPLIT_MAX  < MEM_CACHE_SIZE8K, "split");
#endif

#if RAMDISK_ENABLE
        buffer_t cmp;

        mbuffer_init(&cmp, 0);
        mbuffer_clone(&cmp, buf);
#endif

        chunk_io = mem_cache_calloc(MEM_CACHE_8K, 1);

        ret = volume_proto_io_split(volume_proto, io,  (buffer_t *)buf,
                                      chunk_io, &chunk_count);
        if (unlikely(ret)) {
                GOTO(err_split, ret);
        }

        if (unlikely(volume_proto->table1.fileinfo.attr & __FILE_ATTR_SNAPSHOT__)) {
                YASSERT(0);
        } else if (unlikely(volume_proto->table1.fileinfo.attr & __FILE_ATTR_CLONE__)) {
                // TODO LSV
                ret = __volume_proto_chunk_write_clone(volume_proto, io_opt, chunk_io, chunk_count);
                if (unlikely(ret))
                        GOTO(err_split, ret);
        } else {
                ret = __volume_proto_chunk_write(volume_proto, io_opt, chunk_io, chunk_count);
                if (unlikely(ret))
                        GOTO(err_split, ret);
        }

        for (i = 0; i < chunk_count; i++) {
                tmp = &chunk_io[i];
                YASSERT(tmp->buf.len);
                mbuffer_merge((buffer_t *)buf, &tmp->buf);
        }

#if RAMDISK_ENABLE
        YASSERT(mbuffer_compare(&cmp, buf) == 0);
        mbuffer_free(&cmp);

        ret = ramdisk_pwrite(volume_proto->ramdisk_fd, &volume_proto->chkid, buf, io->size, io->offset);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }
#endif

        ANALYSIS_QUEUE(0, IO_WARN, "volume_write");

        mem_cache_free(MEM_CACHE_8K, chunk_io);

out:
        return 0;
err_split:
        for (i = 0; i < chunk_count; i++) {
                tmp = &chunk_io[i];
                YASSERT(tmp->buf.len);
                mbuffer_merge((buffer_t *)buf, &tmp->buf);
        }
#if RAMDISK_ENABLE
        YASSERT(mbuffer_compare(&cmp, buf) == 0);
        mbuffer_free(&cmp);
#endif

        mem_cache_free(MEM_CACHE_8K, chunk_io);
err_ret:
        return ret;
}

int IO_FUNC volume_proto_rep_read(volume_proto_t *volume_proto, const io_t *io, buffer_t *buf)//, size_t size, off_t offset);
{
        int ret, chunk_count;
        chunk_io_t *chunk_io;//[LICH_SPLIT_MAX];

        DBUG("read file "CHKID_FORMAT" size %llu off %llu\n",
              CHKID_ARG(&volume_proto->chkid), (LLU)io->size, (LLU)io->offset);

        YASSERT(volume_proto->chkid.id = io->id.id);
        if (unlikely(io->size == 0)) {
                goto out;
        }

#ifdef HAVE_STATIC_ASSERT
        static_assert(sizeof(*chunk_io)  * LICH_SPLIT_MAX  < MEM_CACHE_SIZE8K, "split");
#endif

        chunk_io = mem_cache_calloc(MEM_CACHE_8K, 1);

        ANALYSIS_BEGIN(0);

        ret = volume_proto_io_split(volume_proto, io, NULL,
                                      chunk_io, &chunk_count);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (unlikely(volume_proto->table1.fileinfo.attr & __FILE_ATTR_SNAPSHOT__)) {
                ret = __volume_proto_chunk_read_snapshot(volume_proto, chunk_io, chunk_count, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else if (unlikely(volume_proto->table1.fileinfo.attr & __FILE_ATTR_CLONE__)) {
                ret = __volume_proto_chunk_read_clone(volume_proto, chunk_io, chunk_count, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = __volume_proto_chunk_read(volume_proto, chunk_io, chunk_count, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(0, 1000 * 1000, "volume_read");

        mem_cache_free(MEM_CACHE_8K, chunk_io);

out:
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_8K, chunk_io);
        return ret;        
}
